c45e11cfba168e1544e97b86d6d0782109ef8be4,src/main/java/com/couchbase/client/dcp/conductor/Conductor.java,Conductor,startStreamForPartition,#number#number#number#number#number#number#,160

Before Change


    public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
        final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
        DcpChannel channel = masterChannelByPartition(partition);
        if (channel.state() != LifecycleState.CONNECTED) {
            LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
            return Observable
                .timer(100, TimeUnit.MILLISECONDS)
                .flatMap(new Func1<Long, Observable<?>>() {
                    @Override
                    public Observable<?> call(Long aLong) {
                        return startStreamForPartition(partition, vbuuid, startSeqno, endSeqno,
                            snapshotStartSeqno, snapshotEndSeqno).toObservable();
                    }
                }).toCompletable();
        }
        return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
    }

    public Completable stopStreamForPartition(final short partition) {

After Change


                        .toObservable();
                }
            })
            .retryWhen(anyOf(NotConnectedException.class)
                .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
                .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
                    @Override
                    public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
                        LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);

                    }
                })
                .build()
            )
            .toCompletable();
    }